/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.seari.seiec104.client;

import com.seari.seiec104.utils.JedisPoolUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.openmuc.j60870.ASdu;
import org.openmuc.j60870.IeBinaryCounterReading;
import org.openmuc.j60870.IeNormalizedValue;
import org.openmuc.j60870.IeShortFloat;
import org.openmuc.j60870.IeSinglePointWithQuality;
import org.openmuc.j60870.InformationElement;
import org.openmuc.j60870.InformationObject;
import org.openmuc.j60870.TypeId;
import org.openmuc.j60870.exception.ConnectionLostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

/**
 *
 * @author Rainbow
 */
public class RedisProcessor implements BaseDataProcessor, Runnable
{

    protected static Logger logger = LoggerFactory.getLogger(RedisProcessor.class);
    String processorName;
    IEC104BaseClient client;
    boolean isReady = false;
    String redisAiPrefix;
    String redisDiPrefix;
    String redisDdPrefix;
    static final int TIME_GAP = 120;//总召间隔
    Jedis jedis = JedisPoolUtils.getJedis();

    @Override
    public void processData(ASdu aSdu)
    {
        updateRedisData(aSdu);
    }

    public void updateRedisData(ASdu aSdu)
    {
        TypeId typeId = aSdu.getTypeIdentification();
        switch (typeId)
        {
            case C_IC_NA_1: //总召信息
                logger.info("总召数据");
                processInterrogationCommand(aSdu);
                break;
            case C_CI_NA_1: //总召电度信息
                logger.info("总召电度数据");
                processCounterInterrogationCommand(aSdu);
                break;
            case M_SP_NA_1: //遥信数据
                logger.info("遥信数据");
                processSinglePoint(aSdu);
                break;
            case M_ME_NC_1: //遥测数据
                logger.info("遥测数据");
                processMeasuredValue(aSdu);
                break;
            case M_IT_NA_1: //电度数据
                logger.info("电度数据");
                processCounterValue(aSdu);
                break;
            case M_ME_NA_1: //整形遥测
                logger.info("整形遥测数据");
                processNormalizedMeasuredValue(aSdu);
                break;
            default:
                System.out.println("未处理数据格式：" + typeId.getDescription());
                break;
        }
    }

    public void processInterrogationCommand(ASdu aSdu)
    {
        System.out.println(aSdu.toString());
    }
    
    public void processCounterInterrogationCommand(ASdu aSdu)
    {
        System.out.println(aSdu.toString());
    }

    public void processSinglePoint(ASdu aSdu)
    {
        InformationObject[] informationObjects = aSdu.getInformationObjects();
        Map<String, String> resultMap = new HashMap<>();
        for (InformationObject informationObject : informationObjects)
        {
            int addr = informationObject.getInformationObjectAddress();
            InformationElement[][] informationElementses = informationObject.getInformationElements();
            for (int i = 0; i < informationElementses.length; i++)
            {
                IeSinglePointWithQuality singlePointWithQuality = (IeSinglePointWithQuality) informationElementses[i][0];
                resultMap.put(String.valueOf(addr + i), String.valueOf((singlePointWithQuality.isOn() ? 1 : 0)));
            }
        }
        jedis.hmset(redisDiPrefix, resultMap);
    }

    public void processMeasuredValue(ASdu aSdu)
    {
        InformationObject[] informationObjects = aSdu.getInformationObjects();
        Map<String, String> resultMap = new HashMap<>();
        for (InformationObject informationObject : informationObjects)
        {
            int addr = informationObject.getInformationObjectAddress();
            InformationElement[][] informationElementses = informationObject.getInformationElements();
            for (int i = 0; i < informationElementses.length; i++)
            {
                IeShortFloat shortFloat = (IeShortFloat) informationElementses[i][0];
                resultMap.put(String.valueOf(addr + i), String.valueOf(shortFloat.getValue()));
            }
        }
        jedis.hmset(redisAiPrefix, resultMap);
    }

    public void processNormalizedMeasuredValue(ASdu aSdu)
    {
        InformationObject[] informationObjects = aSdu.getInformationObjects();
        Map<String, String> resultMap = new HashMap<>();
        for (InformationObject informationObject : informationObjects)
        {
            int addr = informationObject.getInformationObjectAddress();
            InformationElement[][] informationElementses = informationObject.getInformationElements();
            for (int i = 0; i < informationElementses.length; i++)
            {
                IeNormalizedValue ieNormalizedValue = (IeNormalizedValue) informationElementses[i][0];
                resultMap.put(String.valueOf(addr + i), String.valueOf(ieNormalizedValue.getUnnormalizedValue()));
            }
        }
        jedis.hmset(redisAiPrefix, resultMap);
    }
    
    public void processCounterValue(ASdu aSdu)
    {
        InformationObject[] informationObjects = aSdu.getInformationObjects();
        Map<String, String> resultMap = new HashMap<>();
        for (InformationObject informationObject : informationObjects)
        {
            int addr = informationObject.getInformationObjectAddress();
            InformationElement[][] informationElementses = informationObject.getInformationElements();
            for (int i = 0; i < informationElementses.length; i++)
            {
                IeBinaryCounterReading ieBinaryCounterReading = (IeBinaryCounterReading) informationElementses[i][0];
                resultMap.put(String.valueOf(addr + i), String.valueOf(ieBinaryCounterReading.getCounterReading()));
            }
        }
        jedis.hmset(redisDdPrefix, resultMap);
    }

    @Override
    public void setProcessorName(String name)
    {
        this.processorName = name;
        this.redisAiPrefix = name + "_AI";
        this.redisDiPrefix = name + "_DI";
        this.redisDdPrefix = name + "_DD";
    }

    @Override
    public String getProcessorName()
    {
        return processorName;
    }

    @Override
    public void setReady(boolean isReady)
    {
        this.isReady = isReady;
    }

    @Override
    public void run()
    {
        try
        {
            TimeUnit.SECONDS.sleep(3); //首次运行时SLEEP 3秒，等待连接建立
        } catch (InterruptedException ex)
        {
            logger.warn(ex.getMessage());
        }
        while (true)
        {
            if (!isReady)
            {
                //第一次连接未成功建立时重新reset客户端进行连接，重连间隔5秒
                client.resetClient();
            } else
            {
                if (client.getConnection().isSocketClosed())
                {
                    //socket连接断开时，重连
                    client.resetClient();
                    continue;
                } else
                {
                    try
                    {
                        client.sendInterrogation();
                        System.out.println("发送总召指令>>>>>>>>");
                        if (client.getDevice().getHasCounter())
                        {
                            //client.sendCounterInterrogation();
                            //System.out.println("发送电度总召指令>>>>>>>>");
                        }
                        TimeUnit.SECONDS.sleep(TIME_GAP);
                    } catch (ConnectionLostException ex)
                    {
                        //连接丢失时重连
                        logger.warn(ex.getMessage());
                        client.resetClient();
                    } catch (InterruptedException ex)
                    {
                        logger.warn(ex.getMessage());
                    }
                }
            }

        }
    }

    @Override
    public void setIecClient(IEC104BaseClient client)
    {
        this.client = client;
    }
}
